Skip to content

Conversation

cpegeric
Copy link
Contributor

@cpegeric cpegeric commented May 27, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #21835

What this PR does / why we need it:

To update the HNSW index via CDC changes.

The design doc:
https://github.com/cpegeric/mo-docs/blob/hnsw_cdc/design/mo/sql/20250501-cpegeric-hnswsync.md


PR Type

Enhancement, Tests


Description

• Implement comprehensive HNSW index CDC (Change Data Capture) synchronization functionality
• Add new HnswSync struct and hnswCdcUpdate SQL function for processing CDC updates via multi-threaded operations
• Introduce hnswSyncSinker for updating HNSW indexes with CDC changes, supporting both float32 and float64 vector types
• Refactor HNSW architecture by replacing HnswBuildIndex and HnswSearchIndex with unified HnswModel structure
• Add CDC data structures (VectorIndexCdc, VectorIndexCdcEntry) and operations for insert, update, delete operations
• Implement transaction-aware SQL execution with RunTxn function and enhanced error handling
• Add comprehensive test suites covering CDC sinker functionality, synchronization operations, and model operations
• Integrate CDC task creation into HNSW index creation workflow with automatic cleanup placeholders
• Enhance array casting with dimension validation and standardize error message formats across vector operations
• Add distributed test cases for HNSW CDC synchronization scenarios including bulk loads and incremental updates


Changes walkthrough 📝

Relevant files
Tests
12 files
hnsw_sinker_test.go
Add comprehensive test suite for HNSW CDC sinker                 

pkg/cdc/hnsw_sinker_test.go

• Comprehensive test suite for HNSW CDC sinker functionality with 692
lines of test code
• Mock implementations for SQL executors and error
handling scenarios
• Test cases covering sinker creation, execution,
error handling, and data processing
• Tests for snapshot and atomic
batch processing with vector data

+692/-0 
sync_test.go
Add test suite for HNSW CDC synchronization                           

pkg/vectorindex/hnsw/sync_test.go

• Test suite for HNSW CDC synchronization with various operation
scenarios
• Tests covering upsert, delete, insert operations with
single and multiple models
• Mock implementations for SQL execution
and streaming operations
• Shuffle testing for concurrent operation
handling

+370/-0 
search_test.go
Enhance HNSW search tests with multi-file support               

pkg/vectorindex/hnsw/search_test.go

• Added mock functions for catalog SQL operations and multi-file
scenarios
• New test helper functions for creating metadata and index
batches
• Enhanced test coverage for search operations with multiple
index files

+106/-0 
model_test.go
Add comprehensive test suite for HnswModel functionality 

pkg/vectorindex/hnsw/model_test.go

• Added comprehensive test suite for HnswModel functionality
• Tests
cover search operations, loading/unloading, add/remove operations, and
SQL generation
• Includes edge case testing for nil model scenarios

Uses mock SQL functions for testing database interactions

+206/-0 
func_hnsw_test.go
Add test cases for HNSW CDC update function                           

pkg/sql/plan/function/func_hnsw_test.go

• Added test cases for hnswCdcUpdate function
• Tests various error
conditions including null arguments and invalid JSON
• Validates
function parameter validation and error handling

+129/-0 
build_test.go
Update HNSW build tests to use new HnswModel structure     

pkg/vectorindex/hnsw/build_test.go

• Updated test code to use HnswModel instead of HnswSearchIndex

Changed function call from NewHnswBuildIndex to NewHnswModelForBuild

Updated struct initialization to use new model type

+5/-5     
types_test.go
Add test cases for vector index CDC functionality               

pkg/vectorindex/types_test.go

• Added test cases for CDC functionality
• Tests Insert, Delete,
Upsert operations and JSON serialization
• Validates CDC data
structure behavior and state management

+63/-0   
sinker_test.go
Update sinker tests for new function signature                     

pkg/cdc/sinker_test.go

• Updated test calls to NewSinker to include the new cnUUID parameter

• Maintains test compatibility with updated function signature

+1/-1     
cdc_test.go
Update CDC test mocks for new sinker signature                     

pkg/frontend/cdc_test.go

• Updated mock NewSinker stub to include cnUUID parameter
• Maintains
test compatibility with updated function signature

+1/-1     
function_id_test.go
Update function ID tests for HNSW CDC function                     

pkg/sql/plan/function/function_id_test.go

• Updated predefined function IDs to include HNSW_CDC_UPDATE

Incremented FUNCTION_END_NUMBER to maintain test consistency

+3/-1     
vector_hnsw_sync.result
Add test results for HNSW CDC synchronization functionality

test/distributed/cases/vector/vector_hnsw_sync.result

• Added test results for HNSW CDC synchronization scenarios
• Covers
empty data, bulk load, and incremental update test cases
• Validates
vector search functionality after CDC operations

+81/-0   
vector_hnsw_sync.sql
Add comprehensive HNSW CDC synchronization test cases       

test/distributed/cases/vector/vector_hnsw_sync.sql

• Added comprehensive test cases for HNSW CDC synchronization
• Tests
PITR and CDC task creation, data operations, and vector searches

Includes scenarios for empty tables, bulk loads, and incremental
updates

+113/-0 
Feature
6 files
sync.go
Implement HNSW index CDC synchronization functionality     

pkg/vectorindex/hnsw/sync.go

• New CDC synchronization functionality for HNSW index updates via SQL
function hnsw_cdc_update()
HnswSync struct for managing CDC
operations with insert, update, delete operations
• Multi-threaded
processing support with concurrent model loading and vector operations

• SQL generation for metadata and storage table updates

+631/-0 
hnsw_sinker.go
Add HNSW CDC sinker for vector index updates                         

pkg/cdc/hnsw_sinker.go

• New hnswSyncSinker implementation for updating HNSW indexes via CDC
changes
• Support for both float32 and float64 vector types with
JSON-based CDC updates
• Transaction-based SQL execution with error
handling and rollback support
• Processing of snapshot and tail data
with atomic batch operations

+573/-0 
func_hnsw.go
Implement HNSW CDC update function for vector index synchronization

pkg/sql/plan/function/func_hnsw.go

• Implemented hnswCdcUpdate function for processing CDC updates

Validates input parameters (database name, table name, dimension, CDC
JSON)
• Calls hnsw.CdcSync to perform the actual synchronization

Includes comprehensive error handling and logging

+77/-0   
util.go
Add CDC task generation for HNSW index synchronization     

pkg/sql/compile/util.go

• Added genCdcHnswIndex function to generate CDC task creation SQL

Creates PITR and CDC task SQL statements for HNSW index
synchronization
• Includes placeholder logic for future CDC task
registration

+38/-0   
list_builtIn.go
Register HNSW CDC update function in built-in functions   

pkg/sql/plan/function/list_builtIn.go

• Added HNSW_CDC_UPDATE function definition to built-in functions list

• Configured function signature with varchar and int32 parameters
returning uint64

+21/-0   
ddl_index_algo.go
Integrate CDC task creation into HNSW index creation         

pkg/sql/compile/ddl_index_algo.go

• Added call to genCdcHnswIndex in vector HNSW index handling

Executes generated CDC SQL statements during index creation

+13/-0   
Enhancement
9 files
model.go
Refactor HNSW model with CDC support and enhanced operations

pkg/vectorindex/hnsw/model.go

• New HnswModel struct replacing HnswBuildIndex with enhanced CDC
support
• Added dirty tracking, atomic length counters, and view mode
support
• Enhanced file operations with checksum validation and
streaming SQL loading
• Methods for concurrent vector operations and
model lifecycle management

+524/-0 
types.go
Add CDC data structures and operations for vector index   

pkg/vectorindex/types.go

• Added CDC-related constants (CDC_INSERT, CDC_UPSERT, CDC_DELETE)

Introduced VectorIndexCdc and VectorIndexCdcEntry structs for CDC
operations
• Added HnswCdcParam struct for CDC parameters

Implemented CDC data manipulation methods (Insert, Upsert, Delete,
ToJson)

+85/-0   
sinker.go
Add HNSW sync sinker support and improve error handling   

pkg/cdc/sinker.go

• Added support for CDCSinkType_HnswSync sink type
• Updated NewSinker
function signature to include cnUUID parameter
• Fixed potential nil
pointer dereference in error handling

+9/-2     
ddl.go
Add placeholder logic for CDC task cleanup in DDL operations

pkg/sql/compile/ddl.go

• Added TODO comments for CDC task cleanup in DropIndex and DropTable
methods
• Placeholder logic for cleaning up CDC tasks when dropping
vector/fulltext indexes

+9/-0     
sqlexec.go
Add transaction-aware SQL execution function                         

pkg/vectorindex/sqlexec/sqlexec.go

• Added RunTxn function for executing SQL operations within
transactions
• Provides transaction-aware SQL execution with proper
context and options setup

+27/-0   
func_cast.go
Enhance array casting with dimension validation                   

pkg/sql/plan/function/func_cast.go

• Enhanced array casting with dimension validation
• Added bypass for
max dimension check when width equals MaxArrayDimension
• Improved
error handling for dimension mismatches

+11/-2   
hnsw.go
Relax table scan validation in HNSW query building             

pkg/sql/plan/hnsw.go

• Commented out table scan validation in buildHnswCreate
• Relaxed
constraints on child node type checking

+6/-4     
cdc_options.go
Add HNSW sync sink type support in CDC options                     

pkg/frontend/cdc_options.go

• Added support for CDCSinkType_HnswSync in CDC options validation

Extended sink type validation to include HNSW sync type

+1/-1     
cdc_exector.go
Update CDC executor to pass CN UUID to sinker                       

pkg/frontend/cdc_exector.go

• Updated NewSinker call to include cnUUID parameter
• Passes
executor's CN UUID to sinker creation

+1/-0     
Code refactoring
2 files
build.go
Refactor HNSW build to use unified model structure             

pkg/vectorindex/hnsw/build.go

• Refactored to use HnswModel instead of HnswBuildIndex for
consistency
• Removed duplicate HnswBuildIndex struct and related
methods
• Updated build operations to work with the new unified model
structure

+11/-182
search.go
Refactor HNSW search to use HnswModel instead of HnswSearchIndex

pkg/vectorindex/hnsw/search.go

• Removed HnswSearchIndex struct and related methods (loadChunk,
LoadIndex, Search)
• Replaced HnswSearchIndex with HnswModel in the
HnswSearch struct
• Refactored LoadMetadata to be a standalone
function and updated field mappings
• Updated LoadIndex method to use
new HnswModel.LoadIndex signature

+8/-141 
Configuration changes
2 files
function_id.go
Add function ID for HNSW CDC update function                         

pkg/sql/plan/function/function_id.go

• Added HNSW_CDC_UPDATE function ID constant
• Updated
FUNCTION_END_NUMBER and function registry mapping

+7/-1     
types.go
Add HNSW sync sink type constant                                                 

pkg/cdc/types.go

• Added CDCSinkType_HnswSync constant for HNSW synchronization sink
type

+4/-3     
Miscellaneous
3 files
vector_hnsw.result
Update vector dimension error message format                         

test/distributed/cases/vector/vector_hnsw.result

• Updated error message format for dimension mismatch from "vector ops
between different dimensions" to "expected vector dimension X !=
actual dimension Y"

+1/-1     
vector_index.result
Update vector index error message format                                 

test/distributed/cases/vector/vector_index.result

• Updated error message format for dimension mismatch to use new
standardized format

+1/-1     
array.result
Update array dimension error message format                           

test/distributed/cases/array/array.result

• Updated error messages for array dimension mismatches to use new
standardized format

+2/-2     

Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    kind/feature Review effort 4/5 size/XXL Denotes a PR that changes 2000+ lines
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    9 participants